Cloud Composer(Airflow)でGCSFileTransformOperatorを使ってみる
はじめに
データアナリティクス事業本部のkobayashiです。
最近データパイプラインツールとしてGoogleCloudのCloud Composer(Airflow)をよく使っているのですが、Google Cloud Storage(以降GCS)上のファイルを加工してから次の工程へ流す場合に使われるオペレータを使ってみたのでその内容をまとめます。
GCSFileTransformOperator
GCS上のファイルをソースとしてそのファイルを加工してから再度GCSへアップロードする用途としてGCSFileTransformOperatorがあります。このオペレータはソースであるGCS上のファイルをローカルに一時的に保存してから変換スクリプトにてファイを変換し、変換後に再びGCSへアップロードするといった処理を行います。出力先は指定しなければソースファイルを上書きを行い、出力先を指定した場合はそのパスへ変換後のファイルを新規オブジェクトとしてアップロードします。
GCSFileTransformOperatorの使い方
GCSFileTransformOperator( task_id="gcs_transform", gcp_conn_id="{GCSのConnection}", source_bucket="{ソースバケット名}", source_object="{ソースオブジェクトのパス}", destination_bucket="{出力バケット名}", destination_object="{出力オブジェクトのパス}", transform_script=['python', 'script.py', 10], )
- gcp_conn_id: Airflowの管理コンソールで設定したConnection名
- source_bucket: ソースオブジェクトのあるバケット
- source_object: ソースオブジェクトのパス
- destination_bucket: 出力先のバケット
- 指定しない場合はsource_bucketと同一になる
- destination_object: 出力先のオブジェクトパス
- 指定しない場合はsource_objectと同一になる
- transform_script: 変換するスクリプトをリスト形式で指定
transform_scriptの書き方ですがリスト形式で指定します。ドキュメントを読んだだけでは少し指定の仕方が理解できなかったのですが、GCSFileTransformOperatorのソースファイルを読んだところリストで渡した値をsubprocess.Popenで実行しているだけだったのでPopenの使い方で記述すれば良いだけです。したがって変換スクリプトではPythonだけでなくシェルスクリプトなども使うことができます。
実際の処理はtransform_script
のリストにソースファイルと出力先ファイル名を付け加えてPopenで実行しています。変換スクリプトの書き方としてはソースファイルが第一引数、出力ファイルが第二引数として与えられるのでpythonですと以下のような形になります。
import sys import pandas as pd src_file=sys.argv[1] dest_file=sys.argv[2] df = pd.read_csv(src_file) # 何らかの変換 df.to_csv(dest_file)
またシェルスクリプトで変換ファイルを行う場合は以下のような形になります。
#! /bin/bash # (例)tsvをcsvへ変換 tr "\\t" "," < $1 > $2
- airflow.providers.google.cloud.operators.gcs — apache-airflow-providers-google Documentation
- subprocess — Subprocess management — Python 3.8.14 documentation
Cloud ComposerでGCSFileTransformOperatorを使ってみる
では実際にGCSFileTransformOperatorを使ってみたいと思います。処理の内容としてはGCS上にUTF-16でエンコードされたCSVがあり、これをUTF-8に変換するような処理を行います。
環境
- Composerバージョン: 2.0.28
- Airflowバージョン: 2.3.3
行う内容としては、AirflowでDAGを作成するdagファイルをPythonで記述します。また変換ファイルはPythonで記述してもよいのですが、ただ単にUTF-16をUTF-8に変換するだけなのでiconv
コマンドを使うシェルスクリプトを記述します。
ディレクトリ構成は以下になります。
Composer用のDAGファイルパス ├── transform_utf16.py └── scripts └── transform_utf16.sh
transform_utf16.py
import os from datetime import datetime from airflow import DAG from airflow.providers.google.cloud.operators.gcs import GCSFileTransformOperator work_dir = os.path.dirname(os.path.abspath(__file__)) with DAG( dag_id="transfer_utf16", start_date=datetime.now(), schedule_interval=None, default_args={"retries": 1}, tags=["gcs2bq"], ) as dag: gcs_to_gcs = GCSFileTransformOperator( task_id="gcs_transform", gcp_conn_id="google_cloud_storage_default", source_bucket="sample_bucket", source_object="user_events/20221219/utf16_sample.csv", destination_object="user_events/20221219/utf8_sample.csv", transform_script=["bash", f"{work_dir}/scripts/transform_utf16.sh"], )
ファイルを変換して同じパスに別名で保存するためdestination_bucket
は指定していません。
transform_utf16.sh
#! /bin/bash iconv -f UTF-16 -t UTF-8 $1 > $2
ソースファイルは$1
で取得でき出力先ファイルは$2
で取得できるので上記のような非常に簡単なワンライナーのコードになります。
あとはこれをCloud ComposerでTrigger DAGで実行すればGCS上のUTF-16のcsvファイルがUTF-8のcsvファイルに変換されます。
Cloud Composerを動かしているインスタンスのスペックにもよりますが今回の検証では12GB程度のファイルで14分ほどでUTF-16からUTF-8に変換が行うことができました。
まとめ
GoogleCloudのCloud Composer(Airflow)でGCSFileTransformOperatorを使ってGCS上のファイルを変換スクリプトで変換してみました。データパイプラインにてソースデータを加工することは往々にしてあることなのでGCSFileTransformOperatorは使い勝手が良いと思います。
ただ1点GCSFileTransformOperatorの使いにくい点としてGCSFileTransformOperatorではsource_objectとしてGCSオブジェクトを指定する必要があり、Prefixで複数ファイルを処理することができません。そのような場合はGCSTimeSpanFileTransformOperatorをつかってみるのが良いかと思います。
最後まで読んで頂いてありがとうございました。